package com.isyscore.os.metadata.kettle.step;

import com.alibaba.fastjson.JSONObject;
import com.isyscore.os.metadata.enums.KettleDataFlowNodeType;
import com.isyscore.os.metadata.kettle.base.AbstractStep;
import com.isyscore.os.metadata.kettle.base.FlowConfig;
import com.isyscore.os.metadata.kettle.base.FlowHup;
import com.isyscore.os.metadata.kettle.vis.ExecSQLNode;
import com.isyscore.os.metadata.kettle.vis.VisNode;
import com.isyscore.os.metadata.utils.SqlUtil;
import com.isyscore.os.metadata.utils.StringEscapeHelper;
import org.apache.commons.lang3.StringUtils;
import org.pentaho.di.core.database.DatabaseMeta;
import org.pentaho.di.core.plugins.PluginInterface;
import org.pentaho.di.core.plugins.PluginRegistry;
import org.pentaho.di.core.plugins.StepPluginType;
import org.pentaho.di.trans.TransHopMeta;
import org.pentaho.di.trans.TransMeta;
import org.pentaho.di.trans.step.StepMeta;
import org.pentaho.di.trans.step.StepMetaInterface;
import org.pentaho.di.trans.steps.randomvalue.RandomValueMeta;
import org.pentaho.di.trans.steps.sql.ExecSQLMeta;

import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;


public class ExecSQLStep extends AbstractStep {

    @Override
    public StepMeta decode(VisNode step, List<DatabaseMeta> databases, TransMeta transMeta, FlowConfig transGraph) throws Exception {
        ExecSQLNode node = (ExecSQLNode) step;
        PluginRegistry registry = PluginRegistry.getInstance();
        PluginInterface sp = registry.findPluginWithId(StepPluginType.class, KettleDataFlowNodeType.ExecSQL.name());
        StepMetaInterface stepMetaInterface = (StepMetaInterface) registry.loadClass(sp);

        ExecSQLMeta execSQLMeta = (ExecSQLMeta) stepMetaInterface;

        execSQLMeta.setDatabaseMeta(DatabaseMeta.findDatabase(transMeta.getDatabases(), node.getDataSourceId() + "-" + node.getDatabaseName()));
        execSQLMeta.setSql(StringEscapeHelper.decode(node.getSql()));
        execSQLMeta.setExecutedEachInputRow(true);
        execSQLMeta.setVariableReplacementActive(true);
        execSQLMeta.setArguments(new String[]{});

        String databaseName = JSONObject.parseArray(transGraph.getOtherInfo()).getJSONObject(0).getJSONObject("data").getJSONObject("formData").getString("databaseName");
        String schema = "";
        if (StringUtils.isNotEmpty(databaseName)) {
            String[] split = databaseName.split("\\.");
            if (split != null && split.length > 0) {
                for (int i = 1; i < split.length; i++) {
                    if (i < split.length - 1) {
                        schema += split[i] + ".";
                    } else {
                        schema += split[i];
                    }
                }
            }
            if(StringUtils.isEmpty( schema)){
                schema = databaseName;
            }
        }
        execSQLMeta.setSql(SqlUtil.buildSqlWitchSchema(execSQLMeta.getSql(), schema));
        StepMeta stepMeta = new StepMeta(KettleDataFlowNodeType.ExecSQL.name(), node.getNodeName(), stepMetaInterface);
        stepMeta.setDraw(true);
        return stepMeta;
    }

    @Override
    public StepMeta after(VisNode step, List<DatabaseMeta> databases, TransMeta transMeta, FlowConfig transGraph) throws Exception {
        //解决该组件单独运行或者 前面只有一个阻塞组件的情况下运行，日志无法采集的问题。在前面默认加入一个节点用于占位来触发日志采集。
        StepMeta stepMeta = getStep(transMeta,step.getNodeName());
        String[] prevStepNames = transMeta.getPrevStepNames(stepMeta.getName());

        PluginRegistry registry = PluginRegistry.getInstance();
        PluginInterface sp = registry.findPluginWithId( StepPluginType.class, KettleDataFlowNodeType.RandomValue.name() );
        StepMetaInterface stepMetaInterface = (StepMetaInterface) registry.loadClass( sp );
        RandomValueMeta randomValueMeta = (RandomValueMeta) stepMetaInterface;
        randomValueMeta.setFieldName(new String[]{"tmp"});
        randomValueMeta.setFieldType(new int[]{1});
        List<FlowHup> newTransHups = new ArrayList<>();

        StepMeta randomValue = new StepMeta( KettleDataFlowNodeType.RandomValue.name(), "占位", stepMetaInterface);
        randomValue.setDraw(true);
        if(prevStepNames.length==0){
            newTransHups.add(new FlowHup("占位",step.getNodeName()));
            randomValue.setParentTransMeta(transMeta);
            transMeta.getSteps().add(randomValue);

        }else if(KettleDataFlowNodeType.BlockingStep.name().equals(getStep(transMeta,prevStepNames[0]).getTypeId())){
            String[] pn = transMeta.getPrevStepNames(prevStepNames[0]);
            if(pn.length==0){
                newTransHups.add(new FlowHup("占位",prevStepNames[0]));
                newTransHups.add(new FlowHup(prevStepNames[0],step.getNodeName()));
                randomValue.setParentTransMeta(transMeta);
                transMeta.getSteps().add(randomValue);
            }
        }

        List<TransHopMeta> hopMetas = newTransHups.stream().map(h -> new TransHopMeta(getStep(transMeta, h.getFrom()), getStep(transMeta, h.getTo()))).collect(Collectors.toList());
        if(hopMetas!=null && hopMetas.size()>0){
            transMeta.setTransHops(hopMetas);
            transGraph.setTransHups(hopMetas.stream().map(h -> new FlowHup(h.getFromStep().getName(), h.getToStep().getName())).collect(Collectors.toList()));
        }


        return null;
    }


    public static void main(String[] args) {
        String schema = "";
        String[] split = "cc".split("\\.");
        if (split != null && split.length > 0) {
            for (int i = 1; i < split.length; i++) {

                if (i < split.length - 1) {
                    schema += split[i] + ".";
                } else {
                    schema += split[i];
                }
            }
            System.out.println(schema);
        }else{
            System.out.println("cc");
        }

    }
}
